/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.curator.framework.recipes.leader;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.UnsupportedEncodingException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
 * <p>
 * Abstraction to select a "leader" amongst multiple contenders in a group of JMVs connected
 * to a Zookeeper cluster. If a group of N thread/processes contends for leadership, one will
 * be assigned leader until it releases leadership at which time another one from the group will
 * be chosen.
 * </p>
 * <p>
 * Note that this class uses an underlying {@link InterProcessMutex} and as a result leader
 * election is "fair" - each user will become leader in the order originally requested
 * (from ZK's point of view).
 * </p>
 */
public class LeaderSelector implements Closeable {
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final CuratorFramework client;
    private final LeaderSelectorListener listener;
    private final CloseableExecutorService executorService;
    // 分布式锁服务
    private final InterProcessMutex mutex;
    // 记录实例状态不被重复start()
    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
    // true 表示实例释放leader，之后还能重新成为leader
    private final AtomicBoolean autoRequeue = new AtomicBoolean(false);
    private final AtomicReference<Future<?>> ourTask = new AtomicReference<Future<?>>(null);

    private volatile boolean hasLeadership;
    private volatile String id = "";

    // 测试
    @VisibleForTesting
    volatile CountDownLatch debugLeadershipLatch = null;
    volatile CountDownLatch debugLeadershipWaitLatch = null;

    // 当前实例状态，不被重复start，
    private enum State {
        LATENT,
        STARTED,
        CLOSED
    }

    // guarded by synchronization
    private boolean isQueued = false;

    private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("LeaderSelector");

    /**
     * @param client     the client
     * @param leaderPath the path for this leadership group
     * @param listener   listener
     */
    public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener) {
        this(client, leaderPath,
            new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true), listener);
    }

    /**
     * @param client        the client
     * @param leaderPath    the path for this leadership group
     * @param threadFactory factory to use for making internal threads
     * @param executor      the executor to run in
     * @param listener      listener
     * @deprecated This constructor was poorly thought out. Custom executor is useless. Use this version instead:
     * {@link #LeaderSelector(CuratorFramework, String, ExecutorService, LeaderSelectorListener)}
     */
    @SuppressWarnings("UnusedParameters")
    @Deprecated
    public LeaderSelector(CuratorFramework client, String leaderPath, ThreadFactory threadFactory, Executor executor,
                          LeaderSelectorListener listener) {
        this(client, leaderPath, new CloseableExecutorService(wrapExecutor(executor), true), listener);
    }

    /**
     * @param client          the client
     * @param leaderPath      the path for this leadership group
     * @param executorService thread pool to use
     * @param listener        listener
     */
    public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService,
                          LeaderSelectorListener listener) {
        this(client, leaderPath, new CloseableExecutorService(executorService), listener);
    }

    /**
     * @param client          the client
     * @param leaderPath      the path for this leadership group
     * @param executorService thread pool to use
     * @param listener        listener
     */
    public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService,
                          LeaderSelectorListener listener) {
        Preconditions.checkNotNull(client, "client cannot be null");
        Preconditions.checkNotNull(leaderPath, "leaderPath cannot be null");
        Preconditions.checkNotNull(listener, "listener cannot be null");

        this.client = client;
        this.listener = new WrappedListener(this, listener);
        hasLeadership = false;

        this.executorService = executorService;

        // 分布式锁服务 lock path = leaderPath
        mutex = new InterProcessMutex(client, leaderPath) {
            @Override
            protected byte[] getLockNodeBytes() {
                return (id.length() > 0) ? getIdBytes(id) : null;
            }
        };
    }

    static byte[] getIdBytes(String id) {
        try {
            return id.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new Error(e); // this should never happen
        }
    }

    /**
     * 调用方法，表示实例释放leader，之后还能重新成为leader
     */
    public void autoRequeue() {
        autoRequeue.set(true);
    }

    /**
     * Sets the ID to store for this leader. Will be the value returned
     * when {@link #getParticipants()} is called. IMPORTANT: must be called
     * prior to {@link #start()} to have effect.
     *
     * @param id ID
     */
    public void setId(String id) {
        Preconditions.checkNotNull(id, "id cannot be null");

        this.id = id;
    }

    /**
     * Return the ID that was set via {@link #setId(String)}
     *
     * @return id
     */
    public String getId() {
        return id;
    }

    /**
     * 尝试leader，方式异步的，后台获取leader，方法会立即返回。
     * 方法限制不允许被多次调用，旧版本中可以，新版requeue()可以被多次调用
     */
    public void start() {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

        Preconditions.checkState(!executorService.isShutdown(), "Already started");
        Preconditions.checkState(!hasLeadership, "Already has leadership");

        // 添加一个连接状态listener，构造方法中设置，WrappedListener 包装之后的listener
        client.getConnectionStateListenable().addListener(listener);
        requeue();
    }

    /**
     * 重新排队尝试领导。如果该实例已经排队，则不会发生任何事情，并且返回false。如果实例没有排队，则重新qeued并返回true
     */
    public boolean requeue() {
        Preconditions.checkState(state.get() == State.STARTED, "close() has already been called");
        return internalRequeue();
    }

    public synchronized boolean internalRequeue() {
        // 是否 start && 是否已经在排队进行leader
        if (!isQueued && (state.get() == State.STARTED)) {
            // 开始 requeue
            isQueued = true;
            //异步 leader
            Future<Void> task = executorService.submit(new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    try {
                        doWorkLoop();
                    } finally {
                        // 设置为未排队，实例可以重新进行排队leader
                        clearIsQueued();
                        // 实例设置为true则可以重新进行选举
                        if (autoRequeue.get()) {
                            internalRequeue();
                        }
                    }
                    return null;
                }
            });
            ourTask.set(task);

            return true;
        }
        return false;
    }

    /**
     * Shutdown this selector and remove yourself from the leadership group
     */
    public synchronized void close() {
        Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED),
            "Already closed or has not been started");

        client.getConnectionStateListenable().removeListener(listener);
        executorService.close();
        ourTask.set(null);
    }

    /**
     * <p>
     * Returns the set of current participants in the leader selection
     * </p>
     * <p>
     * <p>
     * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
     * return a value that does not match {@link #hasLeadership()} as hasLeadership
     * uses a local field of the class.
     * </p>
     *
     * @return participants
     * @throws Exception ZK errors, interruptions, etc.
     */
    public Collection<Participant> getParticipants() throws Exception {
        Collection<String> participantNodes = mutex.getParticipantNodes();

        return getParticipants(client, participantNodes);
    }

    static Collection<Participant> getParticipants(CuratorFramework client, Collection<String> participantNodes)
        throws Exception {
        ImmutableList.Builder<Participant> builder = ImmutableList.builder();

        boolean isLeader = true;
        for (String path : participantNodes) {
            try {
                Participant participant = participantForPath(client, path, isLeader);
                builder.add(participant);
            } catch (KeeperException.NoNodeException ignore) {
                // ignore
            }

            isLeader = false;   // by definition the first node is the leader
        }

        return builder.build();
    }

    /**
     * <p>
     * Return the id for the current leader. If for some reason there is no
     * current leader, a dummy participant is returned.
     * </p>
     * <p>
     * <p>
     * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
     * return a value that does not match {@link #hasLeadership()} as hasLeadership
     * uses a local field of the class.
     * </p>
     *
     * @return leader
     * @throws Exception ZK errors, interruptions, etc.
     */
    public Participant getLeader() throws Exception {
        Collection<String> participantNodes = mutex.getParticipantNodes();
        return getLeader(client, participantNodes);
    }

    static Participant getLeader(CuratorFramework client, Collection<String> participantNodes) throws Exception {
        if (participantNodes.size() > 0) {
            return participantForPath(client, participantNodes.iterator().next(), true);
        }
        return new Participant();
    }

    /**
     * Return true if leadership is currently held by this instance
     *
     * @return true/false
     */
    public boolean hasLeadership() {
        return hasLeadership;
    }

    /**
     * Attempt to cancel and interrupt the current leadership if this instance has leadership
     */
    public synchronized void interruptLeadership() {
        Future<?> task = ourTask.get();
        if (task != null) {
            task.cancel(true);
        }
    }

    private static Participant participantForPath(CuratorFramework client, String path, boolean markAsLeader)
        throws Exception {
        byte[] bytes = client.getData().forPath(path);
        String thisId = new String(bytes, "UTF-8");
        return new Participant(thisId, markAsLeader);
    }

    @VisibleForTesting
    void doWork() throws Exception {
        hasLeadership = false;
        try {
            // 尝试获取锁
            mutex.acquire();
            // 获取成功则成为leader
            hasLeadership = true;
            try {
                // 测试类
                if (debugLeadershipLatch != null) {
                    debugLeadershipLatch.countDown();
                }
                if (debugLeadershipWaitLatch != null) {
                    debugLeadershipWaitLatch.await();
                }
                // 成为leader后调用方法, 如果不希望释放leader，方法不应该返回
                listener.takeLeadership(client);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw e;
            } catch (Throwable e) {
                log.error("The leader threw an exception", e);
            } finally {
                // isQueued = false;
                // 实例可以重新排队leader
                clearIsQueued();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw e;
        } catch (Exception e) {
            log.error("mutex.acquire() threw an exception", e);
            throw e;
        } finally {
            hasLeadership = false;
            try {
                // 释放锁， 其他实例会抢占锁成为leader
                mutex.release();
            } catch (Exception ignore) {
                // ignore errors - this is just a safety
            }
        }
    }

    private void doWorkLoop() throws Exception {
        KeeperException exception = null;
        try {
            doWork();
        } catch (KeeperException.ConnectionLossException e) {
            exception = e;
        } catch (KeeperException.SessionExpiredException e) {
            exception = e;
        } catch (InterruptedException ignore) {
            Thread.currentThread().interrupt();
        }

        // autoRequeue = true 不抛出异常，继续重试进行leader
        if ((exception != null) && !autoRequeue
            .get())   // autoRequeue should ignore connection loss or session expired and just keep trying
        {
            throw exception;
        }
    }

    private synchronized void clearIsQueued() {
        isQueued = false;
    }

    // temporary wrapper for deprecated constructor
    private static ExecutorService wrapExecutor(final Executor executor) {
        return new AbstractExecutorService() {
            private volatile boolean isShutdown = false;
            private volatile boolean isTerminated = false;

            @Override
            public void shutdown() {
                isShutdown = true;
            }

            @Override
            public List<Runnable> shutdownNow() {
                return Lists.newArrayList();
            }

            @Override
            public boolean isShutdown() {
                return isShutdown;
            }

            @Override
            public boolean isTerminated() {
                return isTerminated;
            }

            @Override
            public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
                throw new UnsupportedOperationException();
            }

            @Override
            public void execute(Runnable command) {
                try {
                    executor.execute(command);
                } finally {
                    isShutdown = true;
                    isTerminated = true;
                }
            }
        };
    }

    /**
     * Listener包装，主要是当 stateChanged 抛出 CancelLeadershipException 时释放leader
     */
    private static class WrappedListener implements LeaderSelectorListener {
        private final LeaderSelector leaderSelector;
        private final LeaderSelectorListener listener;

        public WrappedListener(LeaderSelector leaderSelector, LeaderSelectorListener listener) {
            this.leaderSelector = leaderSelector;
            this.listener = listener;
        }

        @Override
        public void takeLeadership(CuratorFramework client) throws Exception {
            listener.takeLeadership(client);
        }

        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            try {
                listener.stateChanged(client, newState);
            } catch (CancelLeadershipException dummy) {
                leaderSelector.interruptLeadership();
            }
        }
    }
}
